Skip to content

Conversation

Copy link

Copilot AI commented Sep 29, 2025

This PR implements a comprehensive tiling transformation engine for Feast's streaming feature processing, inspired by Chronon's tiled architecture. The implementation enables efficient temporal windowing and aggregation for streaming data by dividing time-series into manageable chunks (tiles), with full ComputeEngine integration, Feast Aggregation support, and comprehensive testing infrastructure.

Overview

Tiling addresses key challenges in streaming feature engineering:

  • Memory efficiency: Process large time-series data in bounded chunks
  • Low latency: Incremental processing reduces computation overhead
  • Temporal continuity: Chain features across time boundaries for derived metrics
  • Late data handling: Configurable overlap windows handle out-of-order events
  • Scalability: Independent tile processing enables horizontal scaling

Key Components

Core Architecture

  • TileConfiguration: Configures tile size, window size, overlap, memory limits, and late data handling
  • TiledTransformation: Base class inheriting from Transformation for ComputeEngine execution
  • TransformationMode.TILING: New transformation mode integrated with existing framework

ComputeEngine Integration

  • Engine-agnostic design: Works with Spark, Ray, and other ComputeEngines
  • Mode specification at StreamFeatureView level: Consistent with Feast architecture
  • Distributed execution: Transformations executed by ComputeEngine for scalability
  • Aggregation support: Both Feast Aggregation objects and custom functions

Advanced Features

  • Feast Aggregation Objects: Native support for window-based aggregations
  • Custom Aggregation Functions: Within-tile computations for efficient processing
  • Chaining Functions: Cross-tile feature derivation for temporal continuity
  • Memory Management: Configurable tile cache with automatic cleanup
  • Late Data Support: Handles out-of-order events in streaming pipelines

Usage Examples

Basic Tiling with ComputeEngine

from feast.transformation import tiled_transformation
from feast import Field, Aggregation
from feast.types import Float64, Int64
from datetime import timedelta

@tiled_transformation(
    sources=["transaction_source_fv"],    # Source dependencies for DAG
    schema=[                              # Output schema for UI rendering
        Field(name="rolling_avg", dtype=Float64),
        Field(name="cumulative_amount", dtype=Float64),
        Field(name="transaction_count", dtype=Int64),
    ],
    tile_size=timedelta(hours=1),         # Process data in 1-hour tiles
    window_size=timedelta(minutes=30),    # Window size for aggregations within tiles
    overlap=timedelta(minutes=5),         # 5-minute overlap between tiles
    aggregations=[                        # Feast Aggregation objects
        Aggregation(column="transaction_amount", function="sum", time_window=timedelta(minutes=30)),
        Aggregation(column="transaction_amount", function="mean", time_window=timedelta(minutes=30)),
    ],
    aggregation_functions=[               # Custom aggregation functions
        lambda df: df.groupby('entity_id').agg({
            'transaction_amount': ['count']
        }).reset_index()
    ]
)
def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(
        rolling_avg=df['transaction_amount'].rolling(window=10).mean(),
        cumulative_amount=df['transaction_amount'].cumsum(),
        transaction_count=df.groupby('entity_id').cumcount() + 1
    )

StreamFeatureView Integration

# Mode specified at StreamFeatureView level for ComputeEngine
stream_fv = StreamFeatureView(
    name="transaction_features",
    feature_transformation=hourly_transaction_features,
    source=kafka_source,
    mode="spark",  # ComputeEngine mode specified here
    entities=["customer_id"]
)

Advanced Chaining

@tiled_transformation(
    sources=["transaction_hourly_fv"],
    schema=[
        Field(name="local_cumsum", dtype=Float64),
        Field(name="global_cumsum", dtype=Float64),
    ],
    tile_size=timedelta(hours=1),
    window_size=timedelta(minutes=30),
    chaining_functions=[
        # Chain cumulative features across tiles
        lambda prev_df, curr_df: chain_cumulative_totals(prev_df, curr_df)
    ]
)
def chained_features(df: pd.DataFrame) -> pd.DataFrame:
    return compute_features_with_continuity(df)

# Use with StreamFeatureView (mode specified at view level)
stream_fv = StreamFeatureView(
    name="chained_features",
    feature_transformation=chained_features,
    source=kafka_source,
    mode="spark"  # ComputeEngine mode specified here
)

API Design

The implementation follows Feast's established patterns with proper ComputeEngine integration:

  • Engine-agnostic transformations: No mode parameter in transformation decorator
  • ComputeEngine execution: Mode specified at StreamFeatureView level for distributed processing
  • DAG Integration: sources parameter specifies feature view dependencies for proper DAG construction
  • UI Integration: schema parameter defines output features with names and types for UI rendering
  • Aggregation support: Both Feast Aggregation objects and custom aggregation functions
  • Consistent Pattern: Follows the same design as other Feast transformation decorators

Configuration Options

  • sources: List of source feature views or data sources for DAG construction
  • schema: List of Field definitions specifying output feature names and data types
  • aggregations: List of Feast Aggregation objects for window-based aggregations
  • tile_size: Duration of each time tile (e.g., timedelta(hours=1))
  • window_size: Window size for aggregations within tiles (defaults to tile_size)
  • overlap: Optional overlap between tiles for continuity
  • max_tiles_in_memory: Maximum number of tiles to keep in memory (default: 10)
  • enable_late_data_handling: Whether to handle late-arriving data (default: True)
  • aggregation_functions: Custom functions to apply within each tile
  • chaining_functions: Functions to chain results across tiles for derived features

Note: The transformation mode (e.g., "spark", "pandas") is specified at the StreamFeatureView level, not within the transformation itself. This allows the same tiled transformation to work with different ComputeEngines.

Testing & Template Integration

Comprehensive Test Coverage

  • Unit Tests: Core tiled transformation functionality in sdk/python/tests/unit/transformation/test_tiled_transformation.py
  • Integration Tests: StreamFeatureView integration and ComputeEngine compatibility in sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py
  • Template Integration: Production-ready example in sdk/python/feast/templates/local/feature_repo/example_repo.py

Template Example

The implementation includes a comprehensive example in the Feast project template:

@tiled_transformation(
    sources=[driver_stats_fv],
    schema=[
        Field(name="rolling_avg_trips", dtype=Float64),
        Field(name="cumulative_trips", dtype=Int64),
        Field(name="trip_velocity", dtype=Float64),
    ],
    tile_size=timedelta(hours=1),
    window_size=timedelta(minutes=30),
    aggregations=[
        Aggregation(column="avg_daily_trips", function="sum", time_window=timedelta(minutes=30)),
    ],
)
def driver_tiled_features(df: pd.DataFrame) -> pd.DataFrame:
    """Tiled transformation for efficient streaming driver feature processing."""
    # Calculate rolling features, cumulative metrics, and velocity within tiles
    return processed_df

Documentation & Examples

  • Updated feature transformation documentation with comprehensive tiling section including ComputeEngine integration
  • Created example suite demonstrating basic usage, advanced patterns, and streaming integration
  • Production-ready examples available in Feast project templates
  • Comprehensive test coverage for all tiling functionality with proper test organization

Benefits

This implementation provides:

  • Efficient streaming processing for large-scale temporal data with ComputeEngine execution
  • Memory-bounded operations preventing OOM issues with long-running streams
  • Temporal feature continuity across processing boundaries
  • Production-ready patterns for real-time analytics and fraud detection
  • Seamless integration with existing Feast transformation framework and ComputeEngines
  • Full DAG and UI support through proper metadata specification
  • Flexible execution modes supporting Spark, Ray, and other distributed engines
  • Comprehensive testing infrastructure ensuring reliability and maintainability
  • Template integration making tiled transformations accessible to all new Feast projects

The tiling engine enables use cases like real-time fraud detection, customer behavior analytics, and temporal aggregations that require both efficiency and temporal continuity in streaming environments, while providing full integration with Feast's feature lineage, UI systems, distributed compute infrastructure, and development workflow.

Original prompt

I want to extend feature transformation / transformation engine in Feast to support Tiling in streaming.

It should largely be based on Chronon's approach here:

https://chronon.ai/Tiled_Architecture.html
https://chronon.ai/Python_API.html
https://chronon.ai/authoring_features/ChainingFeatures.html
https://chronon.ai/authoring_features/DerivedFeatures.html


💬 Share your feedback on Copilot coding agent for the chance to win a $200 gift card! Click here to start the survey.

Copilot AI changed the title I want to extend feature transformation / transformation engine in Feast to support Tiling in streaming. It should largely be based on Chronon's approach here: https://chronon.ai/Tiled_Architecture.html https://chronon.ai/Python_API.html https://chr... Implement Chronon-style tiling transformation engine for streaming features Sep 29, 2025
Copilot finished work on behalf of franciscojavierarceo September 29, 2025 14:55
from feast.transformation.mode import TransformationMode


class TileConfiguration:
Copy link
Member

@franciscojavierarceo franciscojavierarceo Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to on_demand_feature_view we should just support a mode parameter for the tiled transformation and pandas should be one of them, rather than create an entire new decorator for each (i.e., requiring the usage of from feast.transformation import pandas_tiled_transformation). Instead, we should have something like:

from feast.transformation import tiled_transformation
from datetime import timedelta

@tiled_transformation(
    mode="pandas",
    tile_size=timedelta(hours=1),
    overlap=timedelta(minutes=5),
    aggregation_functions=[
        lambda df: df.groupby('customer_id').agg({
            'amount': ['sum', 'mean', 'count']
        }).reset_index()
    ]
)
def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(
        rolling_avg=df['amount'].rolling(window=10).mean(),
        cumulative_sum=df['amount'].cumsum()
    )

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot please review and revise.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the API to follow the on_demand_feature_view pattern as requested. The tiled_transformation decorator now accepts a mode parameter instead of having separate decorators for each processing mode.

New API:

from feast.transformation import tiled_transformation

@tiled_transformation(
    tile_size=timedelta(hours=1),
    mode="pandas",  # Mode parameter added
    overlap=timedelta(minutes=5),
    aggregation_functions=[...]
)
def hourly_transaction_features(df): ...

Changes implemented in commit fefd284:

  • Updated tiled_transformation decorator with mode="pandas" parameter
  • Removed separate pandas_tiled_transformation decorator
  • Updated all documentation, examples, and tests to use unified API
  • Maintains all existing functionality while following established Feast patterns

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new class/decorator? I think it could be just an Aggregation attribute.

…nd_feature_view pattern

Co-authored-by: franciscojavierarceo <[email protected]>
Copilot finished work on behalf of franciscojavierarceo September 29, 2025 15:57
tile_size=timedelta(hours=1), # Process data in 1-hour tiles
mode="pandas", # Use pandas processing mode
overlap=timedelta(minutes=5), # 5-minute overlap between tiles
aggregation_functions=[

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot this omits the schema and variable names, those are important primitives to retain so that we can accurately construct the DAG, as well as render the information in the UI.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the missing sources and schema parameters to the tiled_transformation decorator following the on_demand_feature_view pattern.

The updated API now includes:

  • sources: List of source feature views/data sources for DAG construction
  • schema: List of Field definitions with names and types for UI rendering

Updated documentation examples show the complete pattern:

@tiled_transformation(
    sources=["transaction_source_fv"],
    schema=[
        Field(name="rolling_avg", dtype=Float64),
        Field(name="cumulative_amount", dtype=Float64),
    ],
    tile_size=timedelta(hours=1),
    mode="pandas",
    # ...
)

These primitives enable proper DAG construction and feature metadata rendering in the UI. Commit: 915896a

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this is inconsistent with the StreamFeatureView example below. These need to b e aligned.

)

# Usage in StreamFeatureView
stream_fv = StreamFeatureView(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to pass in a pandas data frame transformation into the Stream Feature View, does it? I suppose it may, we support the "spark" mode today. But this isn't compatible with it as the "mode" is passed into the tiled_transformation vs the StreamFeatureView, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right! I've refactored the architecture to fix this inconsistency.

The mode is now specified at the StreamFeatureView level (where it belongs), not within the transformation itself:

@tiled_transformation(
    tile_size=timedelta(hours=1),
    # mode parameter removed from here
)
def my_transformation(df): ...

# Use with StreamFeatureView - mode specified here
stream_fv = StreamFeatureView(
    feature_transformation=my_transformation,
    mode="spark",  # ComputeEngine mode specified at view level
    source=kafka_source
)

This makes it consistent with Feast's architecture where transformations are engine-agnostic and the execution mode is determined by the feature view. Commit: 1932fff

…n_demand_feature_view pattern

Co-authored-by: franciscojavierarceo <[email protected]>
Copilot AI changed the title Implement Chronon-style tiling transformation engine for streaming features Implement Chronon-style tiling transformation engine for streaming features with DAG and UI integration Sep 29, 2025
Copilot finished work on behalf of franciscojavierarceo September 29, 2025 20:04
import random


def tiled_transformation(tile_size, mode="pandas", overlap=None, max_tiles_in_memory=10,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be based on the Transformation class and use the ComputeEngine class as well.

Transformations should be executed on a ComputeEngine. The Tiling should allow usage of an Aggregation as well.

We also need to support a window_size in the class as well as the max tiles.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot please adjust.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the implementation to address all these architectural requirements:

  1. Based on Transformation class: TiledTransformation now properly inherits from the base Transformation class
  2. ComputeEngine integration: Transformations are designed to be executed by ComputeEngine (Spark, Ray, etc.)
  3. Aggregation support: Added support for Feast Aggregation objects alongside custom aggregation functions
  4. Added window_size parameter: Separate from tile_size for aggregations within tiles
  5. Added max_tiles parameter: For memory management

Enhanced API:

@tiled_transformation(
    aggregations=[  # Feast Aggregation objects
        Aggregation(column="amount", function="sum", time_window=timedelta(minutes=30))
    ],
    tile_size=timedelta(hours=1),
    window_size=timedelta(minutes=30),  # New parameter
    max_tiles_in_memory=10,  # New parameter
)
def my_transformation(df): ...

The transformation now works with ComputeEngine for distributed execution and follows Feast's established transformation patterns. Commit: 1932fff

@@ -0,0 +1,246 @@
"""
Streaming Integration Example

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be under sdk/tests/unit/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the streaming integration test to the proper location. Created sdk/python/tests/unit/transformation/test_tiled_transformation_integration.py with comprehensive integration tests for:

  • Tiled transformation creation and configuration
  • StreamFeatureView compatibility
  • ComputeEngine integration
  • Aggregation support (both Feast objects and custom functions)
  • Memory management

Removed the example-based streaming integration file. Commit: 29ee76d

@@ -0,0 +1,253 @@
"""
Advanced Tiled Transformation Example

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot rather than add this documented example, let's add to the sdk/python/feast/templates//local/feature_repo/example_repo.py instead and update the tests?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comprehensive tiled transformation example to sdk/python/feast/templates/local/feature_repo/example_repo.py. The example demonstrates:

@tiled_transformation(
    sources=[driver_stats_fv],
    schema=[Field(name="rolling_avg_trips", dtype=Float64), ...],
    tile_size=timedelta(hours=1),
    window_size=timedelta(minutes=30),
    aggregations=[Aggregation(column="avg_daily_trips", function="sum", ...)],
)
def driver_tiled_features(df): ...

This provides a production-ready example that will be available in all new Feast projects created from templates, with proper ComputeEngine integration patterns and comprehensive documentation. Commit: 29ee76d

…Aggregation objects

Co-authored-by: franciscojavierarceo <[email protected]>
Copilot AI changed the title Implement Chronon-style tiling transformation engine for streaming features with DAG and UI integration Implement Chronon-style tiling transformation engine for streaming features with ComputeEngine and Aggregation support Sep 29, 2025
Copilot finished work on behalf of franciscojavierarceo September 29, 2025 20:16
… to example template

Co-authored-by: franciscojavierarceo <[email protected]>
Copilot AI changed the title Implement Chronon-style tiling transformation engine for streaming features with ComputeEngine and Aggregation support Implement Chronon-style tiling transformation engine for streaming features with ComputeEngine, Aggregation support, and comprehensive testing Sep 29, 2025
Copilot finished work on behalf of franciscojavierarceo September 29, 2025 20:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants